import pandas as pd
import numpy as np
import numpy.linalg as la
import datetime
import statsmodels.api as sm
import datetime
from tqdm import tqdm
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler, MinMaxScaler
import matplotlib.pyplot as plt
from scipy.stats import zscore
from statsmodels.regression.linear_model import OLS
from statsmodels.tools.tools import add_constant
import plotly.express as px
import plotly.graph_objects as go
import warnings
warnings.filterwarnings("ignore", category=RuntimeWarning)
class Functions:
"""
A class that provides various functionalities for data analysis and visualization
in the context of financial time series and regression analysis. It includes methods
for plotting, principal component analysis, and regression.
Methods:
- time_plot(data, name, line_thickness=1, digits=4): Plots a time series line plot with summary statistics.
- eigenweights(df_eigen, eigen_weight, date, title): Plots eigenweights for a specified eigenvalue and date.
- plot_hist(data, title, x_label=None, y_label=None, bins=None): Plots a histogram with customization options and summary statistics.
- pca(data): Performs principal component analysis on the given data.
- regress_part1(df, ticker): Performs the first part of regression analysis.
- regress_part2(df): Performs the second part of regression analysis.
"""
def __init__(self):
pass
@staticmethod
def time_plot(data, name, line_thickness=1, digits=4, return_fig = False):
"""
Plots a time series line plot for the given data with summary statistics.
:param data: DataFrame containing the data to plot.
:param name: Title of the plot.
:param line_thickness: Thickness of the lines (default is 1).
:param digits: Number of digits to display for mean and standard deviation (default is 4).
"""
fig = go.Figure()
# Melt the data
melted_data = pd.melt(data, id_vars='startTime', value_vars=data.columns[:])
# Create traces for each variable
for variable in melted_data['variable'].unique():
subset = melted_data[melted_data['variable'] == variable]
fig.add_trace(go.Scatter(
x=subset['startTime'],
y=subset['value'],
mode='lines+markers',
name=variable,
line=dict(width=line_thickness) # Set line thickness here
))
# Calculate mean and standard deviation
mean_value = np.mean(melted_data['value'])
std_value = np.std(melted_data['value'])
# Create annotations for mean and std
fig.add_annotation(
go.layout.Annotation(
text=f"Mean: {mean_value:.{digits}f}<br>Std Dev: {std_value:.{digits}f}",
x=0.7, # Adjust x-coordinate
y=0.9, # Adjust y-coordinate
xref="paper",
yref="paper",
showarrow=False,
align="left"
)
)
# Customize layout
fig.update_layout(
title=name,
xaxis=dict(title='Time'),
yaxis=dict(title='Value'),
showlegend=True,
plot_bgcolor='lightgray',
paper_bgcolor='white',
)
# Show the plot
fig.show()
if return_fig:
return fig
@staticmethod
def eigenweights(df_eigen, eigen_weight, date, title):
"""
Plots eigenweights for a specified eigenvalue and date.
:param df_eigen: DataFrame containing eigenvalues.
:param eigen_weight: Eigenvalue for which weights are to be plotted.
:param date: Specific date for plotting.
:param title: Title of the plot.
"""
df_eigenWeight = df_eigen[df_eigen['index'] == eigen_weight]
df_eigenWeight = df_eigenWeight[df_eigenWeight['startTime'] == date]
del df_eigenWeight['startTime'], df_eigenWeight['index']
df_eigenWeight = df_eigenWeight.T
df_eigenWeight.rename(columns={df_eigenWeight.columns[0]: 'EigenWeight'}, inplace=True)
df_eigenWeight = df_eigenWeight.sort_values(by=['EigenWeight'], ascending=False)
df_eigenWeight.dropna(inplace=True)
fig = px.line(df_eigenWeight)
fig.update_layout(title_text=title)
fig.show()
@staticmethod
def plot_hist(data, title, x_label=None, y_label=None, bins=None, return_fig = False):
"""
Plots a histogram for the given data with customization options and summary statistics.
:param data: DataFrame or Series containing the data to plot.
:param title: Title of the plot.
:param x_label: Label for the x-axis (optional).
:param y_label: Label for the y-axis (optional).
:param bins: Number of bins for the histogram (optional).
"""
fig = go.Figure()
# Create a histogram trace
hist_trace = go.Histogram(
x=data,
nbinsx=bins # Adjust the number of bins as needed
)
fig.add_trace(hist_trace)
# Calculate mean and standard deviation
mean_value = np.mean(data)
std_value = np.std(data)
# Create annotations for mean and std
fig.add_annotation(
go.layout.Annotation(
text=f"Mean: {mean_value:.4f}<br>Std Dev: {std_value:.4f}",
x=0.7, # Adjust x-coordinate
y=0.9, # Adjust y-coordinate
xref="paper",
yref="paper",
showarrow=False,
align="left"
)
)
# Customize layout
fig.update_layout(
title=title,
xaxis_title=x_label,
yaxis_title=y_label,
showlegend=False, # Hide legend for a histogram
plot_bgcolor='lightgray',
paper_bgcolor='white',
)
# Show the plot
fig.show()
if return_fig:
return fig
@staticmethod
def pca(data):
"""
Performs principal component analysis on the given data.
:param data: DataFrame containing the data for PCA.
:return: DataFrame containing the first two principal components.
"""
scaler = StandardScaler()
data = scaler.fit_transform(data)
R = np.cov(data.T)
evals, evecs = la.eigh(R)
idx = np.argsort(evals)[::-1]
evecs = evecs[:, idx]
evals = evals[idx]
evecs = evecs[:, :2]
evecs = pd.DataFrame(evecs)
return evecs
@staticmethod
def regress_part1(df, ticker):
"""
Performs the first part of regression analysis.
:param df: DataFrame containing the data.
:param ticker: The ticker for which regression is to be performed.
:return: Residual of the regression model.
"""
df['Intercept'] = 1
X = df[['Factor1', 'Factor2']]
y = df[ticker]
model = sm.OLS(y, X).fit()
residual = model.resid
return residual
@staticmethod
def regress_part2(df):
"""
Performs the second part of regression analysis.
:param df: DataFrame containing the data.
:return: DataFrame containing regression coefficients and residual variance.
"""
df['Intercept'] = 1
X = df[['cum_sum', 'Intercept']]
y = df['cum_sum_shift']
model = sm.OLS(y, X).fit()
coef = model.params
resid = (model.resid).var()
df1 = pd.DataFrame(data=coef.values, index=coef.index).T
df1['residual_var'] = resid
df1 = df1.T
return df1
@staticmethod
def scale_weights(df, weight_column, scaled_column):
mms = MinMaxScaler()
df[scaled_column] = mms.fit_transform(df[[weight_column]])
return df
@staticmethod
def calculate_eigen_portfolio(df, return_column, weight_column, eigen_column):
df[eigen_column] = df[return_column] * df[weight_column]
return df
def cumulative_returns(df, columns):
for column in columns:
total_column = f"total_{column}"
cum_column = f"cum_{column}"
df[total_column] = 1 + df[column]
df[cum_column] = df[total_column].cumprod()
return df
class CryptoDataProcessor:
"""
A class for processing cryptocurrency data, including merging price and ticker data,
calculating eigenvectors, weights, and coefficients related to cryptocurrency portfolios.
"""
def __init__(self):
"""
Initializes the CryptoDataProcessor class.
"""
pass
def read_and_process_data(self, price_file, ticker_file):
"""
Reads cryptocurrency price and ticker data from CSV files, processes them by
converting the 'startTime' to datetime format and ensuring inclusion of 'ETH' and 'BTC'.
Merges the price and ticker dataframes on 'startTime'.
:param price_file: Filepath for the CSV file containing price data.
:param ticker_file: Filepath for the CSV file containing ticker data.
:return: Merged DataFrame of price and ticker data.
"""
df_price = pd.read_csv(price_file)
df_price['startTime'] = pd.to_datetime(df_price['startTime'])
df_ticker = pd.read_csv(ticker_file)
df_ticker['startTime'] = pd.to_datetime(df_ticker['startTime'])
# Convert ticker columns to a list and ensure BTC and ETH are included
ticker_cols = [str(i) for i in range(40)]
df_ticker['ticker_list'] = df_ticker[ticker_cols].values.tolist()
df_ticker['ticker_list'] = df_ticker.apply(lambda x: set(x['ticker_list'] + ['BTC', 'ETH']), axis=1)
df_merge = pd.merge(df_price, df_ticker, on='startTime', how='inner')
return df_merge
def eigen(self, data):
"""
Computes the eigenvectors and weights from the given data.
:param data: DataFrame containing the cryptocurrency data.
:return: DataFrame of eigenvectors and weights, along with Factor1 and Factor2.
"""
std = data.std(skipna=True)
evecs = Functions.pca(data) # Placeholder for actual PCA computation
evecs.index = std.index
std = pd.DataFrame(std)
evecs = evecs.reset_index()
std = std.reset_index()
evecs1 = evecs.merge(std, on='index')
evecs1 = evecs1.set_axis(['Ticker', 'EigVec1', 'EigVec2', 'Std'], axis=1)
evecs1['w_eig'] = evecs1['EigVec1'] / evecs1['Std']
evecs1['w_eig2'] = evecs1['EigVec2'] / evecs1['Std']
evecs1.replace([np.nan, np.inf, -np.inf], 0, inplace=True)
Factor1 = data.dot(evecs1['w_eig'].to_numpy())
Factor2 = data.dot(evecs1['w_eig2'].to_numpy())
return evecs1, Factor1, Factor2
def coef(self, df, ticker_list):
"""
Calculates coefficients for each ticker in the ticker list.
:param df: DataFrame containing the cryptocurrency data.
:param ticker_list: List of tickers to calculate coefficients for.
:return: DataFrame with calculated coefficients for each ticker.
"""
coef_ab = pd.DataFrame()
dict_residual_sum = {}
for i in ticker_list:
if str(i) == 'nan':
continue
residual_error = pd.DataFrame()
residual_error[i] = Functions.regress_part1(df, i)
residual_error['cum_sum'] = residual_error[i].cumsum(skipna=True)
residual_error['cum_sum_shift'] = residual_error['cum_sum'].shift(-1)
residual_error.replace([np.nan, np.inf, -np.inf], 0, inplace=True)
dict_residual_sum[i] = residual_error[i].sum(skipna=True)
coef_ab[i] = Functions.regress_part2(residual_error)
coef_ab = coef_ab.T
coef_ab['k'] = coef_ab.apply(lambda x: -1 * np.log(x['cum_sum']) * 8760, axis=1)
coef_ab['m'] = coef_ab.apply(lambda x: x['Intercept'] / (1 - x['cum_sum']), axis=1)
coef_ab['sigma'] = coef_ab.apply(lambda x: np.sqrt((x['residual_var'] * 2 * x['k']) / (1 - x['cum_sum'] ** 2)), axis=1)
coef_ab['sigma_eq'] = coef_ab.apply(lambda x: np.sqrt((x['residual_var']) / (1 - x['cum_sum'] ** 2)), axis=1)
coef_ab.replace([np.nan, np.inf, -np.inf], 0, inplace=True)
dict_residual_sum = pd.DataFrame(dict_residual_sum.items(), columns=['ticker', 'X'])
coef_ab = coef_ab.reset_index()
coef_ab.rename(columns={'index': "ticker"}, inplace=True)
coef_ab = coef_ab.merge(dict_residual_sum, on='ticker')
coef_ab['S'] = coef_ab.apply(lambda x: (x['X'] - x['m']) / x['sigma_eq'] if x['sigma_eq'] != 0 else 0, axis=1)
return coef_ab
class MeanReversionStrategy:
"""
A class representing a mean reversion trading strategy.
It generates trading signals based on specified threshold levels.
"""
def __init__(self):
"""
Initializes the MeanReversionStrategy class.
"""
# Threshold levels for generating signals
self.sell_open_threshold = 1.25 # Threshold for opening sell positions
self.buy_open_threshold = -1.25 # Threshold for opening buy positions
self.sell_close_threshold = 0.75 # Threshold for closing sell positions
self.buy_close_threshold = -0.5 # Threshold for closing buy positions
def generate_signals(self, df_coeff, signal_dict):
"""
Generates trading signals based on the mean reversion strategy.
:param df_coeff: DataFrame containing coefficients for each ticker.
:param signal_dict: Dictionary holding the current signal state for each ticker.
:return: Updated DataFrame with generated signals.
"""
for index, row in df_coeff.iterrows():
tick = row['ticker']
current_signal = signal_dict.get(tick, 0)
# Generate signals based on the strategy's thresholds
if current_signal == 0:
if row["S"] < self.buy_open_threshold:
df_coeff.at[index, 'signal'] = 1
signal_dict[tick] = 1
elif row["S"] > self.sell_open_threshold:
df_coeff.at[index, 'signal'] = -1
signal_dict[tick] = -1
else:
df_coeff.at[index, 'signal'] = current_signal
elif current_signal == 1:
if row["S"] > self.buy_close_threshold:
df_coeff.at[index, 'signal'] = 0
signal_dict[tick] = 0
else:
df_coeff.at[index, 'signal'] = current_signal
elif current_signal == -1:
if row["S"] < self.sell_close_threshold:
df_coeff.at[index, 'signal'] = 0
signal_dict[tick] = 0
else:
df_coeff.at[index, 'signal'] = current_signal
return df_coeff
processor = CryptoDataProcessor()
price_data = pd.read_csv('coin_all_prices_full.csv')
price_data['startTime'] = pd.to_datetime(price_data['startTime'])
df_merge = processor.read_and_process_data('coin_all_prices_full.csv', 'coin_universe_150K_40.csv')
coefficients = Functions()
dict1, dict2, dict3 = {}, {}, {}
for index, row in tqdm(df_merge.iterrows()):
# Extract relevant tickers based on ticker_list for the current row
relevant_tickers = list(set(df_merge.columns).intersection(df_merge['ticker_list'][index]))
current_data = df_merge[relevant_tickers].copy()
# Add and calculate additional columns
current_data['startTime'] = df_merge['startTime']
current_data['trade_start'] = row['startTime']
current_data['Diff'] = current_data['trade_start'] - current_data['startTime']
# Filter data based on time difference
current_data = current_data[(current_data['Diff'] > datetime.timedelta(hours=0)) &
(current_data['Diff'] <= datetime.timedelta(hours=241))]
# Calculate percent change and filter out first row
current_data.iloc[:, :-3] = current_data.iloc[:, :-3].pct_change()
current_data = current_data.iloc[1:, :]
current_data.replace([np.nan, np.inf, -np.inf], 0, inplace=True)
# Ensure we have enough data points
if current_data.shape[0] < 240:
continue
# Calculate eigenvalues and factors
factors = CryptoDataProcessor()
evecs1, Factor1, Factor2 = factors.eigen(current_data.iloc[:, :-3])
# Add Factor1 and Factor2 to the current data
current_data['Factor1'] = pd.Series(Factor1)
current_data['Factor2'] = pd.Series(Factor2)
current_data.replace([np.nan, np.inf, -np.inf], 0, inplace=True)
# Transpose and adjust evecs1 DataFrame
evecs1 = evecs1.T
evecs1.columns = evecs1.iloc[0]
evecs1['startTime'] = row['startTime']
evecs1 = evecs1.iloc[1:, :]
# Store results in dictionaries
dict1[row['startTime']] = current_data
dict2["evecs" + str(row['startTime'])] = evecs1
dict3["coef" + str(row['startTime'])] = factors.coef(current_data, df_merge['ticker_list'][index])
dict3["coef" + str(row['startTime'])]['startTime'] = row['startTime']
14015it [1:12:52, 3.21it/s]
# Instead of appending DataFrames in a loop, use pd.concat for efficiency
df_eigen = pd.concat(tqdm(dict2.values())).reset_index()
# Reorder columns to bring 'startTime' to the front
cols = ['startTime'] + [col for col in df_eigen if col != 'startTime']
df_eigen = df_eigen[cols]
# Function to process weights into the desired long format
def process_weights(df, weight_indicator, value_name):
weight_df = df[df['index'] == weight_indicator].copy()
weight_df.drop(columns='index', inplace=True)
return weight_df.melt(id_vars=['startTime'], var_name='ticker', value_name=value_name)
# Process eigen weight 1 and eigen weight 2
df_eigen_w1 = process_weights(df_eigen, 'w_eig', 'Weight1')
df_eigen_w2 = process_weights(df_eigen, 'w_eig2', 'Weight2')
100%|██████████| 13774/13774 [00:00<00:00, 1743281.33it/s]
tradingsignal = MeanReversionStrategy()
# Combine all DataFrames from dict3 into a single DataFrame
df_coeff = pd.concat([x for x in tqdm(dict3.values())], ignore_index=True)
# Initialize signal_dict with 0 for each unique ticker
signal_dict = {ticker: 0 for ticker in df_coeff['ticker'].drop_duplicates()}
# Add a 'signal' column with NaN values
df_coeff["signal"] = np.nan
# Example: tradingsignal = MeanReversionStrategy() or import tradingsignal
df_coeff = tradingsignal.generate_signals(df_coeff, signal_dict)
# Pivot the DataFrame for startTime and ticker, with signal values
df_coeff_signal = df_coeff.pivot(index='startTime', columns='ticker', values='signal')
# Exporting Trading signal file if necessary
#df_coeff_signal.to_csv('trading_signals.csv')
100%|██████████| 13774/13774 [00:00<00:00, 3058194.02it/s]
df_price_transformed = price_data.melt(id_vars=["startTime", "time"], var_name="ticker", value_name="price")
df_price_transformed.sort_values(by=['ticker', 'startTime'], inplace=True)
df_price_transformed['ret'] = df_price_transformed.groupby('ticker')['price'].pct_change()
df_price_transformed['ret'] = df_price_transformed.groupby('ticker')['ret'].shift(-1)
# Merge the transformed price data with df_coeff
df_coeff_final = pd.merge(df_coeff, df_price_transformed, on=['startTime', 'ticker'])
# Replace NaN, inf, and -inf in 'ret' with 0
df_coeff_final['ret'].replace([np.nan, np.inf, -np.inf], 0, inplace=True)
# Calculate traded returns based on the signal
df_coeff_final['traded_ret'] = df_coeff_final['ret'] * df_coeff_final['signal']
# Merging dataframes
data_frames = [df_coeff_final, df_eigen_w1, df_eigen_w2]
merged_df = data_frames[0]
for df in data_frames[1:]:
merged_df = merged_df.merge(df, on=['startTime', 'ticker'], how='inner')
# Scaling weights
merged_df = Functions.scale_weights(merged_df, 'Weight1', 'Scaled_Weight1')
merged_df = Functions.scale_weights(merged_df, 'Weight2', 'Scaled_Weight2')
# Calculating Eigen portfolios
merged_df = Functions.calculate_eigen_portfolio(merged_df, 'ret', 'Scaled_Weight1', 'Eigen_pf1')
merged_df = Functions.calculate_eigen_portfolio(merged_df, 'ret', 'Scaled_Weight2', 'Eigen_pf2')
# Group by 'startTime' and calculate cumulative returns
grouped_df = merged_df.groupby('startTime')[['Eigen_pf1', 'Eigen_pf2']].sum()
grouped_df = Functions.cumulative_returns(grouped_df, ['Eigen_pf1', 'Eigen_pf2'])
# Processing df_price
df_price_secs = price_data[['startTime', 'ETH', 'BTC']].copy()
df_price_secs['ETH'] = df_price_secs['ETH'].bfill(axis='rows')
df_price_secs.iloc[:, 1:] = df_price_secs.iloc[:, 1:].pct_change()
df_price_secs = df_price_secs.iloc[1:, :]
df_price_secs.iloc[:, 1:] = (1 + df_price_secs.iloc[:, 1:]).cumprod()
# Merging and plotting cumulative returns
cumulative_returns_df = df_price_secs.merge(grouped_df[['cum_Eigen_pf1', 'cum_Eigen_pf2']], on='startTime')
Functions.time_plot(cumulative_returns_df, 'Cummulative Returns')
df_eigenvec1 = df_eigen[df_eigen['index'] == 'EigVec1']
df_eigenvec2 = df_eigen[df_eigen['index'] == 'EigVec2']
del df_eigenvec1['index'], df_eigenvec2['index']
#export as required
df_eigenvec1.to_csv('task1a_1.csv')
df_eigenvec2.to_csv('task1a_2.csv')
Functions.eigenweights(df_eigen,'w_eig','2021-09-26 12:00:00+00:00','Eigen_Wgt1_for_2021-09-26')
Functions.eigenweights(df_eigen,'w_eig2','2021-09-26 12:00:00+00:00','Eigen_Wgt2_for_2021-09-26')
Functions.eigenweights(df_eigen,'w_eig','2022-04-15 20:00:00+00:00','Eigen_Wgt1_for_2022-04-15')
Functions.eigenweights(df_eigen,'w_eig2','2022-04-15 20:00:00+00:00','Eigen_Wgt2_for_2022-04-15')
BTC_df = df_coeff[df_coeff['ticker'] == 'BTC']
BTC_df = BTC_df[(BTC_df['startTime'] >= '2021-09-26 06:00:00+00:00') & (BTC_df['startTime'] <= '2021-10-25 18:00:00+00:00')]
ETH_df = df_coeff[df_coeff['ticker'] == 'ETH']
ETH_df = ETH_df[(ETH_df['startTime'] >= '2021-09-26 06:00:00+00:00') & (ETH_df['startTime'] <= '2021-10-25 18:00:00+00:00')]
Functions.time_plot(BTC_df[['startTime','S']],'BTC Signal score trend')
Functions.time_plot(ETH_df[['startTime','S']],'ETH Signal score trend')
df_final = df_coeff_final.groupby('startTime')['traded_ret'].mean()
df_final = pd.DataFrame(df_final)
df_final['ret'] = 1 + df_final['traded_ret']
df_final['cum_ret'] = df_final['ret'].cumprod()
df_final = df_final.reset_index()
histogram = Functions.plot_hist(df_final['traded_ret'],'Traded Returns Histogram', return_fig = True)
cumulativereturn = Functions.time_plot(df_final[['startTime','cum_ret']], 'Cumulative_Return_on_Portfolio', line_thickness = 0.3, return_fig = True)
sharpe_ratio = df_final['traded_ret'].mean() / df_final['traded_ret'].std()
window = 1000
df_final['Roll_Max'] = df_final['traded_ret'].rolling(window, min_periods=1).max()
df_final['Daily_Drawdown'] = df_final['traded_ret']/df_final['Roll_Max'] - 1.0
df_final['Max_Daily_Drawdown'] = df_final['Daily_Drawdown'].rolling(window, min_periods=1).min()
Functions.time_plot(df_final[['startTime','Daily_Drawdown','Max_Daily_Drawdown']],'Maximum Drawdown Plot with 1000 hour window')
import plotly.io as pio
# Calculate daily drawdown
daily_drawdown = (df_final['traded_ret'] - df_final['traded_ret'].cummax()) / df_final['traded_ret'].cummax()
# Calculate maximum daily drawdown
max_daily_drawdown = daily_drawdown.min() # This will be negative
# Print maximum daily drawdown in green
print("\033[32mMaximum Daily Drawdown: {:.2f}%\033[0m".format(max_daily_drawdown * 100))
# Print Sharpe Ratio
print("\033[32mSharpe_Ratio is {:.5f}\033[0m".format(round(sharpe_ratio, 5)))
print()
# Write images to files
pio.write_image(cumulativereturn, 'cumulative_return.jpeg', format='jpeg')
pio.write_image(histogram, 'hist_return.jpeg', format='jpeg')
print("\033[32mImages saved to folder successfully.\033[0m")
Maximum Daily Drawdown: -411.77% Sharpe_Ratio is -0.03143 Images saved to folder successfully.